package com.flink.examples.mysql;

import com.flink.examples.TUser;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.io.jdbc.split.GenericParameterValuesProvider;
import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.types.Row;

import java.io.Serializable;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @Description 将mysql表中数据按多组查询条件分别输出到DataSet流中
 * @Author JL
 * @Date 2021/08/18
 * @Version V1.0
 */
public class DataSetPageSource {

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
        //查询sql
        String sql = "SELECT id,name,age,sex,address,createTimeSeries FROM t_user ";
        //设置表视图字段与类型
        TableSchema tableSchema = TableSchema.builder()
                .field("id", DataTypes.INT())
                .field("name", DataTypes.STRING())
                .field("age", DataTypes.INT())
                .field("sex", DataTypes.INT())
                .field("address", DataTypes.STRING())
                //.field("createTime", DataTypes.TIMESTAMP())
                .field("createTimeSeries", DataTypes.BIGINT())
                .build();

        //多组查询
        Serializable[][] serializables = new Integer[3][1];
        ParameterValuesProvider provider = new GenericParameterValuesProvider(serializables);
        serializables[0] = new Integer[]{0,10};
        serializables[1] = new Integer[]{10,20};
        serializables[2] = new Integer[]{20,30};

        //配置jdbc，从mysql中获取数据源
        DataSource<Row> dataSource = env.createInput(JDBCInputFormat.buildJDBCInputFormat()
                .setDrivername(MysqlConfig.DRIVER_CLASS)
                .setDBUrl(MysqlConfig.SOURCE_DRIVER_URL)
                .setUsername(MysqlConfig.SOURCE_USER)
                .setPassword(MysqlConfig.SOURCE_PASSWORD)
                .setQuery(sql + "where id>? and id<=?")
                .setParametersProvider(provider)
                .setRowTypeInfo(new RowTypeInfo(tableSchema.getFieldTypes()))
                .finish());
        dataSource.print();

        //将查询结果转换为DataSet
        DataSet<TUser> dataSet = dataSource.map(new MapFunction<Row, TUser>() {
            @Override
            public TUser map(Row value) throws Exception {
                TUser user = new TUser();
                user.setId((Integer) value.getField(0));
                user.setName((String) value.getField(1));
                user.setAge((Integer) value.getField(2));
                user.setSex((Integer) value.getField(3));
                user.setAddress((String) value.getField(4));
                user.setCreateTimeSeries((Long) value.getField(5));
                return user;
            }
        });
        //创建视图表，将DataSet数据与字段注册到视图表内
        tEnv.createTemporaryView("t_user", dataSet, $("id"),$("name"),$("age"),$("sex"),$("address"),$("createTimeSeries"));
        //查询指定值
        Table table = tEnv.sqlQuery(sql + " where id = " + 8);
        //获取结果
        TableResult result = table.execute();
        result.print();
    }

}
